/*
 * Copyright (c) 2001, 2012, Oracle and/or its affiliates. All rights reserved.
 * ORACLE PROPRIETARY/CONFIDENTIAL. Use is subject to license terms.
 *
 *
 *
 *
 *
 *
 *
 *
 *
 *
 *
 *
 *
 *
 *
 *
 *
 *
 *
 *
 */

package com.sun.corba.se.impl.transport;

import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;

import org.omg.CORBA.CompletionStatus;
import org.omg.CORBA.SystemException;

import com.sun.corba.se.pept.encoding.InputObject;
import com.sun.corba.se.pept.encoding.OutputObject;
import com.sun.corba.se.pept.protocol.MessageMediator;

import com.sun.corba.se.spi.logging.CORBALogDomains;
import com.sun.corba.se.spi.orb.ORB;
import com.sun.corba.se.spi.protocol.CorbaMessageMediator;
import com.sun.corba.se.spi.transport.CorbaConnection;
import com.sun.corba.se.spi.transport.CorbaResponseWaitingRoom;

import com.sun.corba.se.impl.encoding.BufferManagerReadStream;
import com.sun.corba.se.impl.encoding.CDRInputObject;
import com.sun.corba.se.impl.logging.ORBUtilSystemException;
import com.sun.corba.se.impl.orbutil.ORBUtility;
import com.sun.corba.se.impl.protocol.giopmsgheaders.LocateReplyOrReplyMessage;
import com.sun.corba.se.impl.protocol.giopmsgheaders.ReplyMessage;

/**
 * @author Harold Carr
 */
public class CorbaResponseWaitingRoomImpl
    implements
    CorbaResponseWaitingRoom {

  final static class OutCallDesc {

    java.lang.Object done = new java.lang.Object();
    Thread thread;
    MessageMediator messageMediator;
    SystemException exception;
    InputObject inputObject;
  }

  private ORB orb;
  private ORBUtilSystemException wrapper;

  private CorbaConnection connection;
  // Maps requestId to an OutCallDesc.
  final private Map<Integer, OutCallDesc> out_calls;

  public CorbaResponseWaitingRoomImpl(ORB orb, CorbaConnection connection) {
    this.orb = orb;
    wrapper = ORBUtilSystemException.get(orb,
        CORBALogDomains.RPC_TRANSPORT);
    this.connection = connection;
    out_calls =
        Collections.synchronizedMap(new HashMap<Integer, OutCallDesc>());
  }

  ////////////////////////////////////////////////////
  //
  // pept.transport.ResponseWaitingRoom
  //

  public void registerWaiter(MessageMediator mediator) {
    CorbaMessageMediator messageMediator = (CorbaMessageMediator) mediator;

    if (orb.transportDebugFlag) {
      dprint(".registerWaiter: " + opAndId(messageMediator));
    }

    Integer requestId = messageMediator.getRequestIdInteger();

    OutCallDesc call = new OutCallDesc();
    call.thread = Thread.currentThread();
    call.messageMediator = messageMediator;
    out_calls.put(requestId, call);
  }

  public void unregisterWaiter(MessageMediator mediator) {
    CorbaMessageMediator messageMediator = (CorbaMessageMediator) mediator;

    if (orb.transportDebugFlag) {
      dprint(".unregisterWaiter: " + opAndId(messageMediator));
    }

    Integer requestId = messageMediator.getRequestIdInteger();

    out_calls.remove(requestId);
  }

  public InputObject waitForResponse(MessageMediator mediator) {
    CorbaMessageMediator messageMediator = (CorbaMessageMediator) mediator;

    try {

      InputObject returnStream = null;

      if (orb.transportDebugFlag) {
        dprint(".waitForResponse->: " + opAndId(messageMediator));
      }

      Integer requestId = messageMediator.getRequestIdInteger();

      if (messageMediator.isOneWay()) {
        // The waiter is removed in releaseReply in the same
        // way as a normal request.

        if (orb.transportDebugFlag) {
          dprint(".waitForResponse: one way - not waiting: "
              + opAndId(messageMediator));
        }

        return null;
      }

      OutCallDesc call = out_calls.get(requestId);
      if (call == null) {
        throw wrapper.nullOutCall(CompletionStatus.COMPLETED_MAYBE);
      }

      synchronized (call.done) {

        while (call.inputObject == null && call.exception == null) {
          // Wait for the reply from the server.
          // The ReaderThread reads in the reply IIOP message
          // and signals us.
          try {
            if (orb.transportDebugFlag) {
              dprint(".waitForResponse: waiting: "
                  + opAndId(messageMediator));
            }
            call.done.wait();
          } catch (InterruptedException ie) {
          }
          ;
        }

        if (call.exception != null) {
          if (orb.transportDebugFlag) {
            dprint(".waitForResponse: exception: "
                + opAndId(messageMediator));
          }
          throw call.exception;
        }

        returnStream = call.inputObject;
      }

      // REVISIT -- exceptions from unmarshaling code will
      // go up through this client thread!

      if (returnStream != null) {
        // On fragmented streams the header MUST be unmarshaled here
        // (in the client thread) in case it blocks.
        // If the header was already unmarshaled, this won't
        // do anything
        // REVISIT: cast - need interface method.
        ((CDRInputObject) returnStream).unmarshalHeader();
      }

      return returnStream;

    } finally {
      if (orb.transportDebugFlag) {
        dprint(".waitForResponse<-: " + opAndId(messageMediator));
      }
    }
  }

  public void responseReceived(InputObject is) {
    CDRInputObject inputObject = (CDRInputObject) is;
    LocateReplyOrReplyMessage header = (LocateReplyOrReplyMessage)
        inputObject.getMessageHeader();
    Integer requestId = new Integer(header.getRequestId());
    OutCallDesc call = out_calls.get(requestId);

    if (orb.transportDebugFlag) {
      dprint(".responseReceived: id/"
          + requestId + ": "
          + header);
    }

    // This is an interesting case.  It could mean that someone sent us a
    // reply message, but we don't know what request it was for.  That
    // would probably call for an error.  However, there's another case
    // that's normal and we should think about --
    //
    // If the unmarshaling thread does all of its work inbetween the time
    // the ReaderThread gives it the last fragment and gets to the
    // out_calls.get line, then it will also be null, so just return;
    if (call == null) {
      if (orb.transportDebugFlag) {
        dprint(".responseReceived: id/"
            + requestId
            + ": no waiter: "
            + header);
      }
      return;
    }

    // Set the reply InputObject and signal the client thread
    // that the reply has been received.
    // The thread signalled will remove outcall descriptor if appropriate.
    // Otherwise, it'll be removed when last fragment for it has been put on
    // BufferManagerRead's queue.
    synchronized (call.done) {
      CorbaMessageMediator messageMediator = (CorbaMessageMediator)
          call.messageMediator;

      if (orb.transportDebugFlag) {
        dprint(".responseReceived: "
            + opAndId(messageMediator)
            + ": notifying waiters");
      }

      messageMediator.setReplyHeader(header);
      messageMediator.setInputObject(is);
      inputObject.setMessageMediator(messageMediator);
      call.inputObject = is;
      call.done.notify();
    }
  }

  public int numberRegistered() {
    return out_calls.size();
  }

  //////////////////////////////////////////////////
  //
  // CorbaResponseWaitingRoom
  //

  public void signalExceptionToAllWaiters(SystemException systemException) {

    if (orb.transportDebugFlag) {
      dprint(".signalExceptionToAllWaiters: " + systemException);
    }

    synchronized (out_calls) {
      if (orb.transportDebugFlag) {
        dprint(".signalExceptionToAllWaiters: out_calls size :" +
            out_calls.size());
      }

      for (OutCallDesc call : out_calls.values()) {
        if (orb.transportDebugFlag) {
          dprint(".signalExceptionToAllWaiters: signaling " +
              call);
        }
        synchronized (call.done) {
          try {
            // anything waiting for BufferManagerRead's fragment queue
            // needs to be cancelled
            CorbaMessageMediator corbaMsgMediator =
                (CorbaMessageMediator) call.messageMediator;
            CDRInputObject inputObject =
                (CDRInputObject) corbaMsgMediator.getInputObject();
            // IMPORTANT: If inputObject is null, then no need to tell
            //            BufferManagerRead to cancel request processing.
            if (inputObject != null) {
              BufferManagerReadStream bufferManager =
                  (BufferManagerReadStream) inputObject.getBufferManager();
              int requestId = corbaMsgMediator.getRequestId();
              bufferManager.cancelProcessing(requestId);
            }
          } catch (Exception e) {
          } finally {
            // attempt to wake up waiting threads in all cases
            call.inputObject = null;
            call.exception = systemException;
            call.done.notifyAll();
          }
        }
      }
    }
  }

  public MessageMediator getMessageMediator(int requestId) {
    Integer id = new Integer(requestId);
    OutCallDesc call = out_calls.get(id);
    if (call == null) {
      // This can happen when getting early reply fragments for a
      // request which has completed (e.g., client marshaling error).
      return null;
    }
    return call.messageMediator;
  }

  ////////////////////////////////////////////////////
  //
  // Implementation.
  //

  protected void dprint(String msg) {
    ORBUtility.dprint("CorbaResponseWaitingRoomImpl", msg);
  }

  protected String opAndId(CorbaMessageMediator mediator) {
    return ORBUtility.operationNameAndRequestId(mediator);
  }
}

// End of file.
